#include <sys/ipc.h>
#include <sys/shm.h>

#include "config.h"
#include "iser.h"
#include "core.h"
#include "iser_rdma.h"
#include "dbg.h"
#include "rdma_event.h"
#include "mem_pool.h"

#define MAX_POLL_WC 512
#define MAX_NUM_DELAYED_ARM 16

//static size_t buf_pool_sz_mb = DEFAULT_POOL_SIZE_MB;

static int num_delayed_arm;
static LIST_HEAD(iser_dev_list);

static __thread int private_num_delayed_arm;
__thread struct list_head private_iser_dev_list;

extern int membuf_num;
extern size_t membuf_size;

static void iser_poll_cq_armable(struct iser_device *dev);

int *get_num_delayed_arm()
{
        if (core_self())
                return &private_num_delayed_arm;
        else
                return &num_delayed_arm;
}

static inline struct list_head *get_iser_dev_list()
{
        if (core_self())
                return &private_iser_dev_list;
        else
                return &iser_dev_list;
}

int iser_dev_find(struct iser_device **_dev, struct rdma_cm_id *cm_id)
{
        struct iser_device *dev;

        list_for_each_entry(dev, get_iser_dev_list(), list) {
                if (dev->ibv_ctxt == cm_id->verbs) {
                        *_dev = dev;
                        return 1;
                }
        }

        return 0;
}

#if 0
static uint8_t* iser_alloc_pool(size_t pool_size, int *shmid)
{
        int shmemid;
        uint8_t *buf;

        /* allocate memory */
        shmemid = shmget(IPC_PRIVATE, pool_size,
                        SHM_HUGETLB | IPC_CREAT | SHM_R | SHM_W);

        if (shmemid < 0) {
                DERROR("shmget rdma pool sz:%zu failed\n", pool_size);
                goto failed_huge_page;
        }

        /* get pointer to allocated memory */
        buf = shmat(shmemid, NULL, 0);

        if (buf == (void*)-1) {
                DERROR("Shared memory attach failure (errno=%d %m)", errno);
                shmctl(shmemid, IPC_RMID, NULL);
                goto failed_huge_page;
        }

        /* mark 'to be destroyed' when process detaches from shmem segment
 *            this will clear the HugePage resources even if process if killed not nicely.
 *                       From checking shmctl man page it is unlikely that it will fail here. */
        if (shmctl(shmemid, IPC_RMID, NULL)) {
                DERROR("Shared memory contrl mark 'to be destroyed' failed (errno=%d %m)", errno);
        }

        DINFO("Allocated huge page sz:%zu\n", pool_size);
        *shmid = shmemid;
        return buf;

 failed_huge_page:
        *shmid = -1;
        return valloc(pool_size);
}

static void iser_free_pool(uint8_t *pool_buf, int shmid) {
        if (shmid >= 0) {
                if (shmdt(pool_buf) != 0) {
                        DERROR("shmem detach failure (errno=%d %m)", errno);
                }
        } else {
                free(pool_buf);
        }
}

#endif
static int iser_init_rdma_buf_pool(struct iser_device *dev)
{
        (void)dev;
#if 0
        int ret, i;
        uint8_t *pool_buf, *list_buf;
        size_t pool_size, list_size;
        struct iser_membuf *rdma_buf;
        int shmid;

        /* The membuf size is rounded up at initialization time to the hardware
         * page size so that allocations for direct IO devices are aligned. */

        membuf_size = roundup(membuf_size, PAGE_SIZE);
        pool_size = buf_pool_sz_mb * 1024 * 1024;
        membuf_num = pool_size / membuf_size;
        pool_size = membuf_num * membuf_size; /* reflect possible round-down */
        pool_buf = iser_alloc_pool(pool_size, &shmid);
        if (!pool_buf) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        list_size = membuf_num * sizeof(*rdma_buf);
        ret = ymalloc((void **)&list_buf, list_size);
        if (ret) {
                GOTO(err_free_pool, ret);
        }

        /* One pool of registered memory per PD */
        dev->membuf_mr = ibv_reg_mr(dev->pd, pool_buf, pool_size,
                                    IBV_ACCESS_LOCAL_WRITE);
        if (!dev->membuf_mr) {
                ret = errno;
                GOTO(err_free_list, ret);
        }

        DINFO("pool buf:%p list:%p mr:%p lkey:0x%x\n",
                pool_buf, list_buf, dev->membuf_mr, dev->membuf_mr->lkey);

        dev->rdma_hugetbl_shmid = shmid;
        dev->membuf_regbuf = pool_buf;
        dev->membuf_listbuf = list_buf;
        INIT_LICH_LIST_HEAD(&dev->membuf_free);
        INIT_LICH_LIST_HEAD(&dev->membuf_alloc);

        for (i = 0; i < membuf_num; i++) {
                rdma_buf = (void *) list_buf;
                list_buf += sizeof(*rdma_buf);

                list_add_tail(&rdma_buf->pool_list, &dev->membuf_free);
                INIT_LICH_LIST_HEAD(&rdma_buf->task_list);
                rdma_buf->addr = pool_buf;
                rdma_buf->size = membuf_size;
                rdma_buf->rdma = 1;

                pool_buf += membuf_size;
        }

        return 0;
err_free_list:
        free(list_buf);
err_free_pool:
        iser_free_pool(pool_buf, shmid);
err_ret:
        return ret;
#endif
        return 0;
}

static int iser_init_rdma_buf_pool_hugepage(struct iser_device *dev)
{
        int ret, i;
        uint8_t *list_buf;
        size_t list_size;
        struct iser_membuf *rdma_buf;

        /* The membuf size is rounded up at initialization time to the hardware
         * page size so that allocations for direct IO devices are aligned. */

        list_size = membuf_num * sizeof(*rdma_buf) * 2;
        ret = ymalloc((void **)&list_buf, list_size);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        INIT_LIST_HEAD(&dev->membuf_read_free);
        INIT_LIST_HEAD(&dev->membuf_write_free);
        INIT_LIST_HEAD(&dev->membuf_write_alloc);
        INIT_LIST_HEAD(&dev->membuf_read_alloc);

        for (i = 0; i < membuf_num - 1; i++) {
                rdma_buf = (void *) list_buf;
                list_buf += sizeof(*rdma_buf);
                
                list_add_tail(&rdma_buf->pool_list, &dev->membuf_write_free);
                INIT_LIST_HEAD(&rdma_buf->task_list);
                rdma_buf->size = membuf_size;
                rdma_buf->rdma = 1;
                mbuffer_init(&rdma_buf->buf, 0);

                rdma_buf = (void *)list_buf;
                list_buf += sizeof(*rdma_buf);
                rdma_buf->rdma = 1;
                mbuffer_init(&rdma_buf->buf, 0);
                list_add_tail(&rdma_buf->pool_list, &dev->membuf_read_free);
        }

        return 0;
err_ret:
        return ret;
}

static void iser_destroy_rdma_buf_pool(struct iser_device *dev)
{
        (void)dev;
#if 0       
        int ret;

        YASSERT(list_empty(&dev->membuf_alloc));

        ret = iser_dev_deregmr(dev->membuf_mr);
        if (ret)
                DERROR("ibv_dereg_mr failed: (errno=%d %m)\n", errno);

        iser_free_pool(dev->membuf_regbuf, dev->rdma_hugetbl_shmid);
        free(dev->membuf_listbuf);

        dev->membuf_mr = NULL;
        dev->membuf_regbuf = NULL;
        dev->membuf_listbuf = NULL;
#endif
        return ;
}

#if USE_HUGE_PAGE
static void iser_destroy_rdma_buf_pool_hugepage(struct iser_device *dev)
{
        YASSERT(list_empty(&dev->membuf_read_alloc));
        YASSERT(list_empty(&dev->membuf_write_alloc));

        free(dev->membuf_listbuf);

        dev->membuf_mr = NULL;
        dev->membuf_regbuf = NULL;
        dev->membuf_listbuf = NULL;
}
#endif

/*
 * Could read as many entries as possible without blocking, but
 * that just fills up a list of tasks.  Instead pop out of here
 * so that tx progress, like issuing rdma reads and writes, can
 * happen periodically.
 */


static int __iser_poll_cq(struct iser_device *dev, int max_wc)
{
        int ret = 0, numwc = 0, count = 0;
        struct ibv_wc wc[8];

        for (;;) {
                ret = ibv_poll_cq(dev->cq, 8, wc);
                if (ret == 0) /* no completions retrieved */
                        break;

                if (ret < 0) {
                        DERROR("ibv_poll_cq failed, ret=%d\n", ret);
                        break;
                }

                for(count = 0; count < ret; count++) {
                        if (wc[count].status == IBV_WC_SUCCESS) {
                                handle_wc(&wc[count]);
                        } else {
                                handle_wc_error(&wc[count]);
                        }

                        numwc++;
                }

                if (numwc == max_wc) {
                        ret = 1;
                        break;
                }
        }
        
        return ret;
}
#if 1
int iser_poll_cq(struct iser_device *dev, int max_wc)
{
        int ret = 0, numwc = 0, count = 0;
        struct ibv_wc wc[8];

        for (;;) {
                ret = ibv_poll_cq(dev->cq, 8, wc);
                if (ret == 0) /* no completions retrieved */
                        break;

                if (unlikely(ret < 0)) {
                        DERROR("ibv_poll_cq failed, ret=%d\n", ret);
                        break;
                }

                for(count = 0; count < ret; count++) {
                        if (likely(wc[count].status == IBV_WC_SUCCESS)) {
                                handle_wc(&wc[count]);
                        } else {
                                handle_wc_error(&wc[count]);
                        }

                        numwc++;
                }

                if (numwc == max_wc) {
                        ret = 1;
                        break;
                }
        }
        
        return ret;
}
#else
static int __iser_poll_cq(struct iser_device *dev, int max_wc)
{
        int ret = 0, numwc = 0, count = 0;
        struct ibv_wc wc[8];

        for (;;) {
                ret = ibv_poll_cq(dev->cq, 8, wc);
                if (ret == 0) /* no completions retrieved */
                        break;

                if (unlikely(ret < 0)) {
                        DERROR("ibv_poll_cq failed, ret=%d\n", ret);
                        break;
                }

                for(count = 0; count < ret; count++) {
                        if (likely(wc[count].status == IBV_WC_SUCCESS)) {
                                handle_wc(&wc[count]);
                        } else {
                                handle_wc_error(&wc[count]);
                        }

                        numwc++;
                }

                if (numwc == max_wc) {
                        ret = 1;
                        break;
                }
        }
        
        return ret;
}
#endif
/* iser_sched_consume_cq() is scheduled to consume completion events that
 * could arrive after the cq had been seen empty, but just before
 * the interrupts were re-armed.
 * Intended to consume those remaining completions only, the function

 * does not re-arm interrupts, but polls the cq until it's empty.
 * As we always limit the number of completions polled at a time, we may
 * need to schedule this functions few times.
 * It may happen that during this process new completions occur, and
 * we get an interrupt about that. Some of the "new" completions may be
 * processed by the self-scheduling iser_sched_consume_cq(), which is
 * a good thing, because we don't need to wait for the interrupt event.
 * When the interrupt notification arrives, its handler will remove the
 * scheduled event, and call iser_poll_cq_armable(), so that the polling
 * cycle resumes normally.
 */
static void iser_sched_consume_cq(struct event_data *tev)
{
        int ret;
        struct iser_device *dev = tev->data;

        ret = __iser_poll_cq(dev, MAX_POLL_WC);
        if (ret > 0) {
                dev->poll_sched.sched_handler = iser_sched_consume_cq;
                iser_sched_add_event(&dev->poll_sched);
        }
}

static void iser_rearm_completions(struct iser_device *dev)
{
        int ret;
        int *num = get_num_delayed_arm();

        ret = ibv_req_notify_cq(dev->cq, 0);
        if (unlikely(ret))
                DERROR("ibv_req_notify_cq failed\n");

        dev->poll_sched.sched_handler = iser_sched_consume_cq;
        iser_sched_add_event(&dev->poll_sched);

        *num = 0;
}

/* Scheduled to poll cq after a completion event has been
 * received and acknowledged, if no more completions are found
 * the interrupts are re-armed */
static void iser_sched_poll_cq(struct event_data *tev)
{
        struct iser_device *dev = tev->data;
        iser_poll_cq_armable(dev);
}

static void iser_poll_cq_armable(struct iser_device *dev)
{
        int ret;
        int *num = get_num_delayed_arm();

        ret = __iser_poll_cq(dev, MAX_POLL_WC);
        if (unlikely(ret < 0)) {
                iser_rearm_completions(dev);
                return;
        }

        if (ret == 0 && (++(*num) == MAX_NUM_DELAYED_ARM)) {
                /* no more completions on cq, give up and arm the interrupts */
                iser_rearm_completions(dev);
        } else {
                dev->poll_sched.sched_handler = iser_sched_poll_cq;
                iser_sched_add_event(&dev->poll_sched);
        }
}

/*
 * Called from main event loop when a CQ notification is available.
 */
static void iser_handle_cq_event(int fd __attribute__ ((unused)),
                                 int type __attribute__((unused)),
                                 int events __attribute__ ((unused)),
                                 void *data,
                                 void *private_mem)
{
        struct iser_device *dev = data;
        void *cq_context;
        int ret;
        (void)private_mem;

        ret = ibv_get_cq_event(dev->cq_channel, &dev->cq, &cq_context);
        if (unlikely(ret != 0)) {
                /* Just print the log message, if that was a serious problem,
                  * it will express itself elsewhere */
                DERROR("failed to retrieve CQ event, cq:%p\n", dev->cq);
                return;
        }

        ibv_ack_cq_events(dev->cq, 1);

        /* if a poll was previosuly scheduled, remove it,
         * as it will be scheduled when necessary */
        if (dev->poll_sched.scheduled)
                iser_sched_remove_event(&dev->poll_sched);

        iser_poll_cq_armable(dev);
}

/*
 * Called from main event loop when async event is available.
 */
static void iser_handle_async_event(int fd __attribute__ ((unused)),
                                    int type __attribute__ ((unused)),
                                    int events __attribute__ ((unused)),
                                    void *data,
                                    void *private_mem)
{
        struct iser_device *dev = data;
        char *dev_name = dev->ibv_ctxt->device->name;
        struct ibv_async_event async_event;
        struct iser_conn *conn;
        
        (void)private_mem;
        if (ibv_get_async_event(dev->ibv_ctxt, &async_event)) {
                DERROR("ibv_get_async_event failed\n");
                return;
        }

        switch (async_event.event_type) {
        case IBV_EVENT_COMM_EST:
                conn = async_event.element.qp->qp_context;
                DERROR("conn:%p cm_id:%p dev:%s, QP evt: %s\n",
                        conn, conn->cm_id, dev_name,
                        ibv_event_type_str(IBV_EVENT_COMM_EST));
                /* force "connection established" event */
                iser_rdma_notify(conn->cm_id);
                break;

        /* rest of QP-related events */
        case IBV_EVENT_QP_FATAL:
        case IBV_EVENT_QP_REQ_ERR:
        case IBV_EVENT_QP_ACCESS_ERR:
        case IBV_EVENT_SQ_DRAINED:
        case IBV_EVENT_PATH_MIG:
        case IBV_EVENT_PATH_MIG_ERR:
        case IBV_EVENT_QP_LAST_WQE_REACHED:
                conn = async_event.element.qp->qp_context;
                DERROR("conn:%p cm_id:%p dev:%s, QP evt: %s\n",
                        conn, conn->cm_id, dev_name,
                        ibv_event_type_str(async_event.event_type));
                break;

        /* CQ-related events */
        case IBV_EVENT_CQ_ERR:
                DERROR("dev:%s CQ evt: %s\n", dev_name,
                        ibv_event_type_str(async_event.event_type));
                break;

        /* SRQ events */
        case IBV_EVENT_SRQ_ERR:
        case IBV_EVENT_SRQ_LIMIT_REACHED:
                DERROR("dev:%s SRQ evt: %s\n", dev_name,
                        ibv_event_type_str(async_event.event_type));
                break;

        /* Port events */
        case IBV_EVENT_PORT_ACTIVE:
        case IBV_EVENT_PORT_ERR:
        case IBV_EVENT_LID_CHANGE:
        case IBV_EVENT_PKEY_CHANGE:
        case IBV_EVENT_SM_CHANGE:
        case IBV_EVENT_CLIENT_REREGISTER:
                DERROR("dev:%s port:%d evt: %s\n",
                        dev_name, async_event.element.port_num,
                        ibv_event_type_str(async_event.event_type));
                break;

        /* HCA events */
        case IBV_EVENT_DEVICE_FATAL:
                DERROR("dev:%s HCA evt: %s\n", dev_name,
                        ibv_event_type_str(async_event.event_type));
                break;

        default:
                DERROR("dev:%s evt: %s\n", dev_name,
                        ibv_event_type_str(async_event.event_type));
                break;
        }

        ibv_ack_async_event(&async_event);
}

int iser_dev_create(struct iser_device **_dev, struct rdma_cm_id *cm_id)
{
        int ret, cqe_num;
        struct iser_device *dev;

        ret = ymalloc((void **)&dev, sizeof(*dev));
        if (ret) {
                GOTO(err_ret, ret);
        }

        dev->ibv_ctxt = cm_id->verbs;

        dev->pd = ibv_alloc_pd(dev->ibv_ctxt);
        if (dev->pd == NULL) {
                ret = errno;
                GOTO(err_free, ret);
        }

        dev->membuf_mr = NULL;
#if USE_HUGE_PAGE
        if (core_self()) {
                ret = iser_init_rdma_buf_pool_hugepage(dev);
                if (ret) {
                        GOTO(err_free, ret);
                }
        } else {
#endif
                ret = iser_init_rdma_buf_pool(dev);
                if (ret) {
                        GOTO(err_free, ret);
                }
#if USE_HUGE_PAGE
        }
#endif

        ret = ibv_query_device(dev->ibv_ctxt, &dev->device_attr);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_free, ret);
        }

        cqe_num = min(dev->device_attr.max_cqe, MAX_CQ_ENTRIES);
        DINFO("max %d CQEs\n", cqe_num);

        dev->cq_channel = ibv_create_comp_channel(dev->ibv_ctxt);
        if (dev->cq_channel == NULL) {
                ret = errno;
                GOTO(err_free, ret);
        }

        core_t * core = core_self();
        if (core) {
                dev->cq = ibv_create_cq(dev->ibv_ctxt, cqe_num, NULL,
                                NULL, 0);
                DINFO("----------------------new device %p cq %p create \n", dev, dev->cq);
                if (dev->cq == NULL) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                /** 对polling core整体内存进行注册，
                 * 只需注册一次,被所有connection共用
                 **/
                dev->ff_resources_mr = ibv_reg_mr(dev->pd, core->tls[VARIABLE_CORE],
                                                get_core_mempages_size(),
                                                IBV_ACCESS_LOCAL_WRITE
                                                | IBV_ACCESS_REMOTE_READ
                                                | IBV_ACCESS_REMOTE_WRITE);
                if (!dev->ff_resources_mr) {
                        DERROR("ibv_reg_mr failed, pd: %p\n", dev->pd);
                        YASSERT(0);
                        GOTO(err_ret, ret);
                }
        } else {
                dev->cq = ibv_create_cq(dev->ibv_ctxt, cqe_num, NULL,
                                dev->cq_channel, 1);
                DINFO("----------------------new device %p cq %p create \n", dev, dev->cq);
                if (dev->cq == NULL) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                iser_sched_init_event(&dev->poll_sched, iser_sched_poll_cq, dev);

                ret = ibv_req_notify_cq(dev->cq, 0);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                ret = rdma_event_add(dev->cq_channel->fd, ISER_EV_FD, EPOLLIN,
                                iser_handle_cq_event, dev, NULL);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }

                ret = rdma_event_add(dev->ibv_ctxt->async_fd, ISER_EV_FD, EPOLLIN,
                                iser_handle_async_event, dev, NULL);
                if (ret)
                        GOTO(err_free, ret);
        }

        list_add_tail(&dev->list, get_iser_dev_list());

        *_dev = dev;
        //if (core_self())
         //       core_self()->iser_dev = (void *)dev;

        return 0;
err_free:
        yfree((void **)&dev);
err_ret:
        return ret;
}

int iser_dev_regmr(struct ibv_mr **_mr, struct ibv_pd *pd,
                        void *data_pool, size_t pool_size)
{
        int ret;
        struct ibv_mr *mr;

        mr = ibv_reg_mr(pd, data_pool,
                                pool_size, IBV_ACCESS_LOCAL_WRITE);
        if (!mr) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        *_mr = mr;

        return 0;
err_ret:
        return ret;
}

int iser_dev_deregmr(struct ibv_mr *mr)
{
        return ibv_dereg_mr(mr);
}

int iser_dev_post_recv(struct iser_conn *conn,
                          struct iser_task *task,
                          int num_recv_bufs)
{
        int ret, nr_posted;
        struct ibv_recv_wr *bad_wr;

        ret = ibv_post_recv(conn->qp_hndl, &task->rxd.recv_wr, &bad_wr);
        if (likely(!ret)) {
                nr_posted = num_recv_bufs;
                /*
                DINFO("conn:%p posted:%d 1st task:%p "
                        "wr_id:0x%lx sge_sz:%u\n",
                        conn, nr_posted, task,
                        (unsigned long)task->rxd.recv_wr.wr_id,
                        task->rxd.sge.length);
                */
        } else {
                struct ibv_recv_wr *wr;

                nr_posted = 0;
                for (wr = &task->rxd.recv_wr; wr != bad_wr; wr = wr->next)
                        nr_posted++;

                DERROR("%m, conn:%p posted:%d/%d 1st task:%p "
                        "wr_id:0x%lx sge_sz:%u\n",
                        conn, nr_posted, num_recv_bufs, task,
                        (unsigned long)task->rxd.recv_wr.wr_id,
                        task->rxd.sge[0].length);
        }

        iser_conn_getn(conn, nr_posted);

        return ret;
}

int iser_dev_post_send(struct iser_conn *conn,
                          struct iser_work_req *iser_send,
                          int num_send_reqs)
{
        int ret, nr_posted;
        struct ibv_send_wr *bad_wr;

        ret = ibv_post_send(conn->qp_hndl, &iser_send->send_wr, &bad_wr);
        if (likely(!ret)) {
                nr_posted = num_send_reqs;
                DBUG("conn:%p posted:%d 1st wr:%p wr_id:0x%lx sge_sz:%u\n",
                        conn, nr_posted, iser_send,
                        (unsigned long)iser_send->send_wr.wr_id,
                        iser_send->sge[0].length);
        } else {
                struct ibv_send_wr *wr;

                nr_posted = 0;
                for (wr = &iser_send->send_wr; wr != bad_wr; wr = wr->next)
                        nr_posted++;

                DERROR("%m, conn:%p posted:%d/%d 1st wr:%p wr_id:0x%lx sge_sz:%u\n",
                        conn, nr_posted, num_send_reqs, iser_send,
                        (unsigned long)iser_send->send_wr.wr_id,
                        iser_send->sge[0].length);
        }

        iser_conn_getn(conn, nr_posted);

        return ret;
}

struct iser_membuf *iser_dev_alloc_write_rdma_buf(struct iser_device *dev)
{
        struct iser_membuf *rdma_buf;

        if (unlikely(list_empty(&dev->membuf_write_free)))
                return NULL;

        rdma_buf = list_entry(dev->membuf_write_free.next, struct iser_membuf,
                                    pool_list);

        list_del(&rdma_buf->pool_list);
 //       list_add_tail(&rdma_buf->pool_list, &dev->membuf_write_alloc);

        DBUG("alloc:%p\n", rdma_buf);
        return rdma_buf;
}

struct iser_membuf *iser_dev_alloc_read_rdma_buf(struct iser_device *dev)
{
        struct iser_membuf *rdma_buf;

        if (unlikely(list_empty(&dev->membuf_read_free)))
                return NULL;

        rdma_buf = list_entry(dev->membuf_read_free.next, struct iser_membuf,
                                    pool_list);

        list_del(&rdma_buf->pool_list);
//        list_add_tail(&rdma_buf->pool_list, &dev->membuf_read_alloc);

        DBUG("alloc:%p\n", rdma_buf);
        return rdma_buf;
}

void iser_dev_free_write_rdma_buf(struct iser_device *dev, struct iser_membuf *rdma_buf)
{


        if (unlikely(!rdma_buf || !rdma_buf->rdma)) {
                DWARN("free a NULL rdma buf!!!!!!!!!!!!!!!!!!!!!!!!!!!\n");
                return;
        }
        DBUG("free %p\n", rdma_buf);
        
        mbuffer_free(&rdma_buf->buf);
        /* add to the free list head to reuse recently used buffers first */
//        list_del(&rdma_buf->pool_list);
        list_add(&rdma_buf->pool_list, &dev->membuf_write_free);
}

void iser_dev_free_read_rdma_buf(struct iser_device *dev, struct iser_membuf *rdma_buf)
{
        if (unlikely(!rdma_buf || !rdma_buf->rdma)) {
                DWARN("free a NULL rdma buf!!!!!!!!!!!!!!!!!!!!!!!!!!!\n");
		YASSERT(0);
                return;
        }
        DBUG("free %p\n", rdma_buf);

        /* add to the free list head to reuse recently used buffers first */

        mbuffer_free(&rdma_buf->buf);
     //   list_del(&rdma_buf->pool_list);
        list_add(&rdma_buf->pool_list, &dev->membuf_read_free);
}

void iser_dev_release(struct iser_device *dev)
{
        int ret;

        list_del(&dev->list);

        DINFO("----------------------free device %p cq %p\n", dev, dev->cq);

        rdma_event_del(dev->ibv_ctxt->async_fd);
        rdma_event_del(dev->cq_channel->fd);
        iser_sched_remove_event(&dev->poll_sched);

        ret = ibv_destroy_cq(dev->cq);
        if (ret)
                DERROR("ibv_destroy_cq failed: (errno=%d %m)\n", errno);

        ret = ibv_destroy_comp_channel(dev->cq_channel);
        if (ret)
                DERROR("ibv_destroy_comp_channel failed: (errno=%d %m)\n", errno);

#if USE_HUGE_PAGE
        if (core_self()) {
                iser_destroy_rdma_buf_pool_hugepage(dev);
        } else {
#endif
                iser_destroy_rdma_buf_pool(dev);
#if USE_HUGE_PAGE
        }
#endif

        ret = ibv_dealloc_pd(dev->pd);
        if (ret)
                DERROR("ibv_dealloc_pd failed: (errno=%d %m)\n", errno);

        yfree((void **)&dev);
}

void iser_ib_release(void)
{
        struct iser_device *dev, *tdev;

        list_for_each_entry_safe(dev, tdev, get_iser_dev_list(), list) {
                iser_dev_release(dev);
        }

        iser_rdma_release();
}

void iser_dev_init()
{
        INIT_LIST_HEAD(&private_iser_dev_list);
}
